"""
This is a script to submit dmlc job via Yarn
dmlc will run as a Yarn application
"""

# pylint: disable=invalid-name, too-many-locals, too-many-branches, missing-docstring
from __future__ import absolute_import

import logging
import os
import platform
import subprocess
import warnings
from threading import Thread

from . import opts, tracker
from .util import py_str


def yarn_submit(args, nworker, nserver, pass_env):
    """Submission function for YARN."""
    is_windows = os.name == "nt"
    hadoop_home = os.getenv("HADOOP_HOME")
    assert hadoop_home is not None, "Need to set HADOOP_HOME for YARN submission."
    hadoop_binary = os.path.join(hadoop_home, "bin", "hadoop")
    assert os.path.exists(
        hadoop_binary
    ), "HADOOP_HOME does not contain the hadoop binary"

    if args.jobname is None:
        if args.num_servers == 0:
            prefix = "DMLC[nworker=%d]:" % args.num_workers
        else:
            prefix = "DMLC[nworker=%d,nsever=%d]:" % (
                args.num_workers,
                args.num_servers,
            )
        args.jobname = prefix + args.command[0].split("/")[-1]

    # Determine path for Yarn helpers
    YARN_JAR_PATH = os.path.join(args.yarn_app_dir, "dmlc-yarn.jar")
    curr_path = os.path.dirname(os.path.abspath(os.path.expanduser(__file__)))
    YARN_BOOT_PY = os.path.join(curr_path, "launcher.py")

    if not os.path.exists(YARN_JAR_PATH):
        warnings.warn('cannot find "%s", I will try to run build' % YARN_JAR_PATH)
        cmd = "cd %s;./build.%s" % (
            os.path.join(os.path.dirname(__file__), os.pardir, "yarn"),
            "bat" if is_windows else "sh",
        )
        print(cmd)
        subprocess.check_call(cmd, shell=True, env=os.environ)
        assert os.path.exists(
            YARN_JAR_PATH
        ), "failed to build dmlc-yarn.jar, try it manually"

    # detech hadoop version
    (out, _) = subprocess.Popen(
        "%s version" % hadoop_binary, shell=True, stdout=subprocess.PIPE
    ).communicate()
    out = py_str(out).split("\n")[0].split()
    assert out[0] == "Hadoop", "cannot parse hadoop version string"
    hadoop_version = int(out[1].split(".")[0])
    (classpath, _) = subprocess.Popen(
        "%s classpath" % hadoop_binary, shell=True, stdout=subprocess.PIPE
    ).communicate()
    classpath = py_str(classpath).strip()

    if hadoop_version < 2:
        raise RuntimeError(
            "Hadoop Version is %s, dmlc_yarn will need Yarn(Hadoop 2.0)" % out[1]
        )

    fset, new_command = opts.get_cache_file_set(args)
    fset.add(YARN_JAR_PATH)
    fset.add(YARN_BOOT_PY)
    ar_list = []

    for fname in args.archives:
        fset.add(fname)
        ar_list.append(os.path.basename(fname))

    JAVA_HOME = os.getenv("JAVA_HOME")
    if JAVA_HOME is None:
        JAVA = "java"
    else:
        JAVA = os.path.join(JAVA_HOME, "bin", "java")
    cmd = "%s -cp %s%s%s org.apache.hadoop.yarn.dmlc.Client " % (
        JAVA,
        classpath,
        ";" if is_windows else ":",
        YARN_JAR_PATH,
    )
    env = os.environ.copy()
    for k, v in pass_env.items():
        env[k] = str(v)

    # ship lib-stdc++.so
    if args.ship_libcxx is not None:
        if platform.architecture()[0] == "64bit":
            libcxx = args.ship_libcxx + "/libstdc++.so.6"
        else:
            libcxx = args.ship_libcxx + "/libstdc++.so"
        fset.add(libcxx)
        # update local LD_LIBRARY_PATH
        LD_LIBRARY_PATH = env["LD_LIBRARY_PATH"] if "LD_LIBRARY_PATH" in env else ""
        env["LD_LIBRARY_PATH"] = args.ship_libcxx + ":" + LD_LIBRARY_PATH

    env["DMLC_JOB_CLUSTER"] = "yarn"
    env["DMLC_WORKER_CORES"] = str(args.worker_cores)
    env["DMLC_WORKER_MEMORY_MB"] = str(args.worker_memory_mb)
    env["DMLC_SERVER_CORES"] = str(args.server_cores)
    env["DMLC_SERVER_MEMORY_MB"] = str(args.server_memory_mb)
    env["DMLC_NUM_WORKER"] = str(args.num_workers)
    env["DMLC_NUM_SERVER"] = str(args.num_servers)
    env["DMLC_JOB_ARCHIVES"] = ":".join(ar_list)

    for f in fset:
        cmd += " -file %s" % f
    cmd += " -jobname %s " % args.jobname
    cmd += " -tempdir %s " % args.hdfs_tempdir
    cmd += " -queue %s " % args.queue
    if args.yarn_app_classpath:
        cmd += " -appcp %s " % args.yarn_app_classpath
    for entry in args.env:
        cmd += " -env %s " % entry
    cmd += " ".join(["./launcher.py"] + new_command)

    logging.debug("Submit job with %d workers and %d servers", nworker, nserver)

    def run():
        """internal running function."""
        logging.debug(cmd)
        subprocess.check_call(cmd, shell=True, env=env)

    thread = Thread(target=run, args=())
    thread.setDaemon(True)
    thread.start()
    return thread


def submit(args):
    submit_thread = []

    def yarn_submit_pass(nworker, nserver, pass_env):
        submit_thread.append(yarn_submit(args, nworker, nserver, pass_env))

    curr_path = os.path.dirname(os.path.abspath(os.path.expanduser(__file__)))
    YARN_BOOT_PY = os.path.join(curr_path, "launcher.py")
    tracker.submit(
        args.num_workers,
        args.num_servers,
        fun_submit=yarn_submit_pass,
        pscmd=(" ".join([YARN_BOOT_PY] + args.command)),
    )
